import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

import java.util.ArrayList;

/**
 * @author wangzj
 * @description
 * @date 2020/7/8 0:21
 */
public class SqlWordCount {
    public static void main(String[] args) throws Exception {
        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //创建表的操作环境
        BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

        //创建一行模拟数据作为输入
        String words = "hello flink hello lagou";
        String[] split = words.split("\\W+");
        ArrayList<WC> list = new ArrayList<>();
        for (String word : split) {
            WC wc = new WC(word, 1L);
            list.add(wc);
        }
        //将list数据转换成DataSet
        DataSet<WC> input = env.fromCollection(list);
        //DataSet转sql，并指定字段名
        Table table = tableEnv.fromDataSet(input, "word,frequency");
        table.printSchema();
        //注册为一个表
        tableEnv.createTemporaryView("words", table);
        //查询结果
        Table result = tableEnv.sqlQuery("select word,sum(frequency) as frequency from words group by word");
        //将表的结果转成dataset输出
        DataSet<WC> dataSetResult = tableEnv.toDataSet(result, WC.class);
        dataSetResult.printToErr();
    }

    /**
     * 单词为key-value的形式获取值
     */
    public static class WC {
        public String word;
        public long frequency;

        public WC() {
        }

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return word + ", " + frequency;
        }
    }
}
